/*
 * Copyright (c) Huawei Technologies Co., Ltd. 2021-2021. All rights reserved.
 */

package autodispose;

import io.reactivex.rxjava3.core.CompletableSource;
import io.reactivex.rxjava3.parallel.ParallelFlowable;

import org.reactivestreams.Subscriber;

/**
 * Class for parallel publishing of events.
 *
 * @param <T>
 */
final class AutoDisposeParallelFlowable<T> extends ParallelFlowable<T> implements ParallelFlowableSubscribeProxy<T> {
    private final ParallelFlowable<T> source;
    private final CompletableSource scope;

    AutoDisposeParallelFlowable(ParallelFlowable<T> source, CompletableSource scope) {
        this.source = source;
        this.scope = scope;
    }

    @Override
    public void subscribe(Subscriber<? super T>[] subscribers) {
        if (!validate(subscribers)) {
            return;
        }
        @SuppressWarnings("unchecked")
        Subscriber<? super T>[] newSubscribers = new Subscriber[subscribers.length];

        for (int i = 0; i < subscribers.length; i++) {
            AutoDisposingSubscriberImpl<? super T> subscriber =
                new AutoDisposingSubscriberImpl<>(scope, subscribers[i]);
            newSubscribers[i] = subscriber;
        }
        source.subscribe(newSubscribers);
    }

    @Override
    public int parallelism() {
      return source.parallelism();
    }
}
